Skip to content

[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254

Merged
oleiman merged 5 commits intodevfrom
sr/noticket/kafka-transport
May 1, 2026
Merged

[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
oleiman merged 5 commits intodevfrom
sr/noticket/kafka-transport

Conversation

@oleiman
Copy link
Copy Markdown
Member

@oleiman oleiman commented Apr 22, 2026

Introduces a pluggable transport interface for schema registry's internal
_schemas topic I/O, replacing the hardcoded kafka::client dependency with kafka_client_transport.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@oleiman oleiman self-assigned this Apr 22, 2026
@oleiman oleiman added the claude-review Adding this label to a PR will trigger a workflow to review the code using claude. label Apr 22, 2026
@oleiman oleiman requested a review from Copilot April 22, 2026 23:37
@oleiman oleiman mentioned this pull request Apr 22, 2026
7 tasks
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Schema Registry’s internal _schemas topic I/O to go through a new transport abstraction, enabling alternative backends (e.g., RPC-based) while keeping existing Kafka-client behavior available via kafka_client_transport.

Changes:

  • Introduce pandaproxy::schema_registry::transport and factor internal topic operations (produce/consume/HWM/auth/mitigation) behind it.
  • Add kafka_client_transport implementation and wire api, service, and seq_writer to use the transport pointer.
  • Update tests to use a noop_transport and add a collision-simulation transport to exercise delete retry behavior.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/v/pandaproxy/schema_registry/transport.h Adds the new transport interface used by SR components.
src/v/pandaproxy/schema_registry/kafka_client_transport.h Declares Kafka-client-backed transport implementation.
src/v/pandaproxy/schema_registry/kafka_client_transport.cc Implements Kafka-client-backed transport behaviors.
src/v/pandaproxy/schema_registry/service.h Switches SR service dependency from kafka client to transport.
src/v/pandaproxy/schema_registry/service.cc Routes internal topic init/load through transport and adds retry logic.
src/v/pandaproxy/schema_registry/seq_writer.h Switches sequencer dependency from kafka client to transport; adds delete retry cache state.
src/v/pandaproxy/schema_registry/seq_writer.cc Uses transport for produce/consume/HWM and adds delete collision retry behavior.
src/v/pandaproxy/schema_registry/api.h Replaces sharded Kafka client with sharded Kafka transport.
src/v/pandaproxy/schema_registry/api.cc Wires up transport lifecycle and passes transport pointers to service/sequencer.
src/v/pandaproxy/schema_registry/fwd.h Adds forward declarations for new transport types.
src/v/pandaproxy/schema_registry/BUILD Adds new transport and kafka_client_transport Bazel targets and updates deps.
src/v/pandaproxy/schema_registry/test/utils.h Adds noop_transport for tests that don’t use topic I/O.
src/v/pandaproxy/schema_registry/test/consume_to_store.cc Updates tests to use noop_transport; adds collision transport test.
src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc Updates test to use noop_transport instead of dummy Kafka client.
src/v/pandaproxy/schema_registry/test/BUILD Updates test deps to include new transport targets.

Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 0e4a6d3 to 4502f20 Compare April 23, 2026 04:01
@oleiman oleiman changed the title Schema Registry: Factor kafka client usage into new transport interface [CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface Apr 23, 2026
@oleiman oleiman requested a review from Copilot April 23, 2026 04:10
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

src/v/pandaproxy/schema_registry/seq_writer.cc:620

  • _delete_versions_cache is populated before several operations that can throw (e.g., is_referenced, tombstone lookups). If an exception is thrown after caching, the cache persists beyond this delete attempt and a later call for the same subject (when it is already soft-deleted) can incorrectly return this stale cached version list. Consider clearing the cache on any exception path after it is set (e.g., a scope guard / try-catch that resets _delete_versions_cache before rethrowing), or delaying cache population until after all non-retriable validations have passed.
    // Cache versions for potential retry — after a subject-level soft
    // delete all versions are marked deleted and the pre-delete list
    // cannot be reconstructed from the store. Tagged with subject so
    // stale entries from a prior delete of a different subject are ignored.
    _delete_versions_cache.emplace(delete_version_cache{sub, versions.copy()});

    // Check that the subject is not referenced
    if (co_await _store.is_referenced(sub, std::nullopt)) {
        throw as_exception(has_references(sub, versions.back()));
    }

Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/seq_writer.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 4502f20 to 7c45444 Compare April 23, 2026 04:22
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Comment thread src/v/pandaproxy/schema_registry/seq_writer.cc Outdated
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 7c45444 to b73bcef Compare April 23, 2026 07:14
@oleiman oleiman marked this pull request as ready for review April 23, 2026 07:14
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from b73bcef to 23114c7 Compare April 23, 2026 07:45
@oleiman oleiman requested a review from Copilot April 23, 2026 07:48
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/seq_writer.h
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
Comment thread src/v/pandaproxy/schema_registry/transport.h
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.

Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch 2 times, most recently from 65cad38 to 01aa966 Compare April 23, 2026 21:09
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Apr 23, 2026

CI test results

test results on build#83603
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "tiered_cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83603#019dbc38-a9ce-4b1f-8235-3bb62893af5a 28/31 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0199, p0=0.1192, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.4114, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
test results on build#83618
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8d-6fb9-4b80-b5d6-6fec5b2d2f30 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
FLAKY(PASS) ShadowLinkingReplicationTests test_replication_with_failures {"storage_mode": "tiered_cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8d-6fb8-48d0-9286-b75a5867962a 19/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0020, p0=0.0386, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3917, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_with_failures
FLAKY(PASS) SimpleEndToEndTest test_relaxed_acks {"write_caching": false} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8e-ea63-44b0-a33f-dd9d47b96693 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0023, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=SimpleEndToEndTest&test_method=test_relaxed_acks
test results on build#83910
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83910#019de07f-d272-4efc-9e90-adac94c9123d 19/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0298, p0=0.4535, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3917, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart

@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 01aa966 to 709a3c7 Compare April 24, 2026 03:17
@oleiman oleiman marked this pull request as draft April 24, 2026 08:23
@oleiman oleiman marked this pull request as ready for review April 25, 2026 02:18
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 25, 2026

@dotnwat @pgellert @nguyen-andrew - I think this one is pretty clean. if you want to see how it maps into the rpc stuff check out #30046

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 25, 2026

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
dt-repeat=5
tests/rptest/tests/schema_registry_test.py

constexpr auto max_backoff = 5000ms;
auto backoff = 100ms;
for (int attempts = 0;; ++attempts) {
auto fut = co_await ss::coroutine::as_future(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are co-routine lambdas safe now? I recall an announcement around this but haven't dug in enough yet.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the current state of play https://clang.llvm.org/extra/clang-tidy/checks/cppcoreguidelines/avoid-capturing-lambda-coroutines.html

tl;dr it's ok if you use explicit this param (or don't capture anything)

Comment thread src/v/pandaproxy/schema_registry/service.cc
Comment thread src/v/pandaproxy/schema_registry/BUILD
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/test/consume_to_store.cc
Comment on lines +587 to +597
// On retry after a write collision the subject may already be
// soft-deleted (by the winning writer). Return the version list
// including deleted, since after a subject-level soft delete all
// versions are marked deleted.
if (co_await _store.is_subject_deleted(sub)) {
co_return std::make_optional(std::move(versions));
co_return co_await _store.get_versions(sub, include_deleted::yes);
}

// Grab the versions before they're gone.
auto versions = co_await _store.get_versions(sub, include_deleted::no);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit title for this says that this leads to a "retry crash". How does this lead to a crash exactly? I suspect that it leads to an infinite or at least slow retry chain inside sequenced_write.

I think this new behaviour is incompatible, because we'd also start returning [list-of-all-soft-deleted-subvers] when soft-deleting a subject that is already soft-deleted (regardless if there's a race or not) whereas we are supposed to throw a 404 in that case as far as I can tell.

In the race scenario you describe, I think we're still supposed to return a 404, and the bug is perhaps to do with the logic of sequenced_write. Maybe we should only retry on write_collision errors inside sequenced_write and propagate all other errors immediately.

Am I missing something here?


This is what I was using for testing the behaviour under the scenario you describe where we're soft deleting a subject with all-soft-deleted subject versions:

  # Adjust to taste
  SR=http://localhost:8081
  SUBJECT=test-subject-value
  CT='Content-Type: application/vnd.schemaregistry.v1+json'

  # 1. Register schema v1 (Avro record with one field)
  curl -sS -X POST "$SR/subjects/$SUBJECT/versions" \
    -H "$CT" \
    -d '{"schema":"{\"type\":\"record\",\"name\":\"Foo\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}"}'
  echo

  # 2. Register schema v2 (add a field — backwards-compatible default added)
  curl -sS -X POST "$SR/subjects/$SUBJECT/versions" \
    -H "$CT" \
    -d '{"schema":"{\"type\":\"record\",\"name\":\"Foo\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"},{\"name\":\"b\",\"type\":\"int\",\"default\":0}]}"}'
  echo

  # Sanity check — should list [1, 2]
  curl -sS "$SR/subjects/$SUBJECT/versions"; echo

  # 3. Soft-delete version 1
  curl -sS -X DELETE "$SR/subjects/$SUBJECT/versions/1"; echo

  # 4. Soft-delete version 2
  curl -sS -X DELETE "$SR/subjects/$SUBJECT/versions/2"; echo

  # Live versions list — expect 404 / empty (all soft-deleted)
  curl -sS -i "$SR/subjects/$SUBJECT/versions"; echo

  # Same list including soft-deleted — should still show [1, 2]
  curl -sS "$SR/subjects/$SUBJECT/versions?deleted=true"; echo

  # 5. The interesting call: soft-delete the subject when all versions
  #    are already soft-deleted. Use -i so you see the status code.
  curl -sS -i -X DELETE "$SR/subjects/$SUBJECT"; echo

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should only retry on write_collision errors inside sequenced_write

good idea

Am I missing something?

Don't think so. Added this after some test failure early on in refactoring, and I'm still not sure there was any faulty behavior at all. I kept it in on the off chance you or someone else would say "oh yes, of course that's a bug", but I think I'll just remove it. We can address in a follow up if it seems like there's a real improvement to be made.

Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.h
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 709a3c7 to e028da1 Compare April 30, 2026 21:36
oleiman added 5 commits April 30, 2026 14:42
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 30, 2026

force push fix PR comments from @pgellert

@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from e028da1 to 57d80b7 Compare April 30, 2026 22:07
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 30, 2026

force push rebase to fix merge conflicts.

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented May 1, 2026

/cdt
tests/rptest/tests/schema_registry_test.py

@pgellert
Copy link
Copy Markdown
Contributor

pgellert commented May 1, 2026

/cdt
rp_version=build
tests/rptest/tests/schema_registry_test.py

Copy link
Copy Markdown
Contributor

@pgellert pgellert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@oleiman oleiman merged commit 58f0e01 into dev May 1, 2026
22 checks passed
@oleiman oleiman deleted the sr/noticket/kafka-transport branch May 1, 2026 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/build area/redpanda claude-review Adding this label to a PR will trigger a workflow to review the code using claude.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants